Published on

Python 避免进程池空闲,实现单个任务(non-batch)立即出列入列

Authors
  • avatar
    Name
    Wang Zhiwei
    Twitter

用途:

初始化一个进程池,当进程池没有满且有剩余等待执行的任务时,自动往池里新增一个任务,即每完成一个任务立即新增一个任务,而不是等待一批任务全部完成再新增一批任务,达到高效利用进程池的目的。


一个简单的例子

import os
import random
import time
from concurrent.futures import FIRST_COMPLETED, wait
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue


def runner(N):
    sleeptime = random.randint(1, 10)
    time.sleep(sleeptime)
    print(f"{os.getpid()}\ttask {N} finished")


tasks = list(range(10))
workers = 3
wait_time = 1

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=workers) as executor:
        rq = dict()  # non-process-safe queue
        tq = Queue(maxsize=len(tasks))
        for task in tasks:
            tq.put_nowait(task)

        # main process polling and dispatching tasks
        while not tq.empty():
            if len(rq) < workers:
                N = tq.get()
                future = executor.submit(runner, N=N)
                rq[future] = N
                print(f"task {N} submitted")
            else:
                done, not_done = wait(rq, timeout=wait_time, return_when=FIRST_COMPLETED)
                if not done:
                    print(f"Waiting for tasks to finish")
                    continue
                for future in done:
                    del rq[future]
                    print(f"task {future} removed from rq")

    print("All tasks done")

执行用例

task 0 submitted
task 1 submitted
task 2 submitted
Waiting for tasks to finish
Waiting for tasks to finish
74421	task 0 finished
task <Future at 0x102d478b0 state=finished returned NoneType> removed from rq
task 3 submitted
Waiting for tasks to finish
74422	task 1 finished
74423	task 2 finished
task <Future at 0x102d598e0 state=finished returned NoneType> removed from rq
task 4 submitted
task <Future at 0x102d599d0 state=finished returned NoneType> removed from rq
task 5 submitted
Waiting for tasks to finish
74421	task 3 finished
task <Future at 0x102d59e20 state=finished returned NoneType> removed from rq
task 6 submitted
74422	task 4 finished
task <Future at 0x102d478b0 state=finished returned NoneType> removed from rq
task 7 submitted
Waiting for tasks to finish
74423	task 5 finished
task <Future at 0x102d59c70 state=finished returned NoneType> removed from rq
task 8 submitted
Waiting for tasks to finish
Waiting for tasks to finish
74422	task 7 finished
task <Future at 0x102d59880 state=finished returned NoneType> removed from rq
task 9 submitted
74423	task 8 finished
74421	task 6 finished
74422	task 9 finished
All tasks done